大数据物流项目:主题及报表开发(十二)

您所在的位置:网站首页 impala hive kudu hbase 大数据物流项目:主题及报表开发(十二)

大数据物流项目:主题及报表开发(十二)

#大数据物流项目:主题及报表开发(十二)| 来源: 网络整理| 查看: 265

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第16天,点击查看活动详情

Logistics_Day12:主题及报表开发

1613867341272

01-[复习]-上次课程内容回顾

​ 主要讲解2个方面内容:==离线报表分析(SparkSQL)和即席查询分析(Impala)==,数据都存储在Kudu数据库中,结构化流程序实时消费Kafka数据,ETL转换后存储到Kudu表中。

==1)、离线报表分析==

技术框架(分析引擎):SparkSQL,数据结构DataFrame/Dataset 按照数据仓库分层管理数据:三层架构(ODS、DWD、DWS),便于管理数据和开发使用 依据主题划分业务报表,每个主题报表开发,需要2步操作: 第一步、事实表(业务数据表)与相关维度表数据进行关联拉宽操作,DWD层 第二步、对宽表数据按照业务指标进行计算,DWS层 公共接口BasicOfflineApp,提供三个方法:load加载数据、process处理数据和save保存数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6guAQyL2-1641191197136)(/img/1616375483879.png)]

1613962944032

​ 每个主题报表分析时,先进行数据拉宽操作,再进行指标计算。其中指标计算,按照日期day划分数据,对每日业务数据进行分析统计,封装到Row对象中,最后将分析指标转换DataFrame进行返回。

1613962957638

2)、==即席查询分析(Ad Hoc)== 使用技术框架(分析引擎):Impala 基于内存分析引擎 SQL on Hadoop 技术框架发展史(Hive:HDFS和HBase -> Impala:HDFS和HBase -> Impala:Kudu) Impala 与Hive框架比较:取代Hive,使用内存进行分布式查询分析;依赖Hive MetaStore管理元数据 Impala 服务架构(三个组件) Impalad服务(守护进程),主要接收SQL语句,转换查询计划,分发计划任务,执行任务,收集结果给Client返回。 QueryPlanner、QueryCoordinator、QueryExecutor StateStored服务,同步集群中所有Impalad服务状态到所有Impalad服务 Catalogd服务,同步元数据到所有Impalad服务,启动时加载HiveMetaStore元数据 Impala分析引擎提供交互式命令:impala-shell,可以直接连接impalad服务 Impala如何查询分析数据流程

1613896374570

如何使用Impala集成Kudu创建表,有2种方式:管理表和外部表(推荐使用) Hue与Impala集成,提供SQL编写界面,底层使用Impala分析数据。 02-[了解]-第12天:课程内容提纲

主要讲解:离线报表数据分析(2个主题:运单主题(讲解)和仓库主题(作业))。

1)、离线报表分析,按照主题topic划分业务报表 每个主题报表开发,按照数据仓库分层管理数据和开发指标,需要编写2个SparkSQL应用 DWD层:数据拉宽,将事实表数据与维度表数据进行关联,leftJoin(大表在左,小表在右) DWS层:指标计算,加载宽表数据按照指标需求进行计算,按照每天数据进行指标计算 重构公共接口:BasicOfflineApp,使用模板方法(Template)设计模式重构接口 load加载数据、process处理数据和save保存数据

1614042929181

3)、Kudu 原理和优化 Kudu数据存储引擎,类似HBase表,Kudu底层数据存储原理结构 使用Kudu时注意事项和优化设置 03-[理解]-运单主题之数据调研及业务分析

​ 在物流快递行业,除了快递单(tbl_express_bill)主要业务数据以外,就是运单(tbl_waybill)数据。

1614044001296

​ “运单是运输合同的证明,是承运人已经接收货物的收据。一份运单,填写托运人、收货人、起运港、到达港。如同一托运人的货物分别属到达港的两个或两个以上收货人,则应分别填制运单。”

​ 运单统计根据区域Area、公司Company、网点Dot、线路Route、运输工具Tool等维度进行统计,可以对各个维度运单数量进行排行,如对网点运单进行统计可以反映该网点的运营情况,对线路运单进行统计可以观察每个线路的运力情况。

1)、运单主题指标

1616378623511

1616378639284

按照6个维度统计运单数量及最大、最小和平均运单数。

2)、业务数据(事实表数据)

1616378705865

3)、维度表

1616378852556

4)、事实表与维度表关联关系图

1616378866761

可以按照主题报表开发步骤进行编码:数据拉宽(DWD层)和指标计算(DWS层)。

04-[掌握]-主题及指标开发之重构公共接口【思路】

前面对【快递单】数据进行指标开发,但是发现问题,有大量代码重复,尤其DWD层和DWS层中MAIN方法:

1)、快递单:DWD层程序【MAIN方法】

1616379126981

2)、快递单:DWS层程序【MAIN方法】

1616379138139

​ 发现2个MAIN方法中,步骤几乎一样的(step1、step2、step3、….),只不过是传递参数不一样而已(比如加载数据时表的名称不一样、处理数据时数据集不一样、保存数据时数据集不一样等),所以可以考虑重构公共接口,采用模板方法设计模式Template Parttern,将main方法中代码封装到方法:execute,传递不同参数即可。

​ 模板方法模式(Template Pattern),是一种类继承模式,主要是==通过一个抽象类,把子类一些共有的类提取出来(称为基本方法)放到抽象类中,并在抽象类中定义一个模板方法,在模板方法中规定基本方法的执行顺序==。将不同的实现细节交给子类去实现。

1616379483662

模板方法设计模式使用场景:

1616379554837

案例说明:小明和小华去学校上学前准备工作(叠被子、吃早餐和去学校)

1616379665319

==假设重构接口:AbstractOfflineApp,其中方法如下所示:==

将SparkSession会话实例创建和关闭,分别封装到不同方法中(基本方法),可以具体实现 定义模板方法:execute,规定基本方法执行顺序,将mian方法中代码移到此方法中,调用方法时桉树进行传递即可。

1614045318315

package cn.itcast.logistics.offline import org.apache.spark.sql.{DataFrame, SparkSession} trait AbstractOfflineApp { // 定义变量 private var spark: SparkSession = _ // 实例化spark对象 def init(clazz: Class[_]): Unit = { } // 从Kudu表加载数据 def loadKuduSource(spark: SparkSession, tableName: String, isLoadFull: Boolean = false): DataFrame = ??? // 处理数据,要么是数据拉宽,要么是指标计算 def process(dataframe: DataFrame): DataFrame // 保存数据到Kudu表 def saveKuduSink(dataframe: DataFrame, tableName: String, keys: Seq[String] = Seq("id")) // 关闭会话实例对象 def close(): Unit = { } // TODO: 定义模块方法,规定基本方法执行顺序 def execute(clazz: Class[_], srcTable: String, isLoadFull: Boolean = false, dstTable: String, keys: Seq[String] = Seq("id")): Unit = { // step1. 初始化 init(clazz) try{ // step2. 加载Kudu表数据 val kuduDF: DataFrame = loadKuduSource(spark, srcTable, isLoadFull) kuduDF.show(10, truncate = false) // step3. 处理数据 val resultDF: DataFrame = process(kuduDF) resultDF.show(10, truncate = false) // step4. 保存数据 saveKuduSink(resultDF, dstTable, keys) }catch { case e: Exception => e.printStackTrace() }finally { // step5. 关闭资源 close() } } } 复制代码 05-[掌握]-主题及指标开发之重构公共接口【编程】

任务:前面已经定义好公共接口方法声明,实现基本方法代码。

package cn.itcast.logistics.offline import cn.itcast.logistics.common.{Configuration, KuduTools, SparkUtils} import org.apache.spark.SparkConf import org.apache.spark.sql.functions.{col, current_date, date_format, date_sub} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * 离线报表分析公共接口,采用模板方法设计模式构建:基本方法和模板方法 */ trait AbstractOfflineApp { // 定义变量 private var spark: SparkSession = _ // 实例化spark对象 def init(clazz: Class[_]): Unit = { // a. 构建SparkConf对象,基本设置 var sparkConf: SparkConf = SparkUtils.sparkConf() // b. 设置运行方式 sparkConf = SparkUtils.autoSettingEnv(sparkConf) // c. 构建SparkSession对象 spark = SparkUtils.createSparkSession(sparkConf, this.getClass) spark.sparkContext.setLogLevel(Configuration.LOG_OFF) } // 从Kudu表加载数据 def loadKuduSource(tableName: String, isLoadFullData: Boolean = false): DataFrame = { // 加载Kudu表数据,不考虑全量还是增量 var kuduDF: DataFrame = spark.read .format(Configuration.SPARK_KUDU_FORMAT) .option("kudu.master", Configuration.KUDU_RPC_ADDRESS) .option("kudu.table", tableName) .option("kudu.socketReadTimeoutMs", "60000") .load() // 如果是增量加载数据,表示加载昨日数据,需要过滤操作 if(!isLoadFullData){ kuduDF = kuduDF // 依据 每个表中字段:cdt = 2013-06-02 21:24:00,过滤数据 .filter( date_sub(current_date(), 1) === date_format(col("cdt"), "yyyy-MM-dd") ) } // 返回数据 kuduDF } // 处理数据,要么是数据拉宽,要么是指标计算 def process(dataframe: DataFrame): DataFrame // 保存数据到Kudu表 def saveKuduSink(dataframe: DataFrame, tableName: String, isAutoCreateTable: Boolean = true, keys: Seq[String] = Seq("id")) = { // 如果允许创建表,并且表不存在,就创建表 if(isAutoCreateTable){ KuduTools.createKuduTable(tableName, dataframe, keys) } // 保存数据到Kudu表 dataframe.write .mode(SaveMode.Append) .format(Configuration.SPARK_KUDU_FORMAT) .option("kudu.master", Configuration.KUDU_RPC_ADDRESS) .option("kudu.table", tableName) .option("kudu.operation", "upsert") .save() } // 关闭会话实例对象 def close(): Unit = { if(null != spark) spark.close() } // TODO: 定义模块方法,规定基本方法执行顺序 def execute(clazz: Class[_], srcTable: String, dstTable: String, isLoadFullData: Boolean = false, isAutoCreateTable: Boolean = true, keys: Seq[String] = Seq("id")): Unit = { // step1. 初始化 init(clazz) try{ // step2. 加载Kudu表数据 val kuduDF: DataFrame = loadKuduSource(srcTable, isLoadFullData) kuduDF.show(10, truncate = false) // step3. 处理数据 val resultDF: DataFrame = process(kuduDF) resultDF.show(10, truncate = false) // step4. 保存数据 saveKuduSink(resultDF, dstTable, isAutoCreateTable, keys) }catch { case e: Exception => e.printStackTrace() }finally { // step5. 关闭资源 close() } } } 复制代码

在模板方法中调用基本方法(具体方法:有方法体和抽象方法:子类具体实现),定义调用步骤。

06-[掌握]-运单主题之数据拉宽开发

​ 任务:针对运单主题进行DWD层数据拉宽开发,创建对象WayBillDWD对象,继承公共接口AbstractOfflineApp,实现process方法即可。

1)、将运单数据与相关维度数据,进行关联JION以后,字段如下:

1616381706248

当不知道获取那些字段时,最简单和粗暴方式:获取所有字段,不客气,不建议。

2)、SQL语句

1616381787869

3)、创建对象,编写代码,将事实表数据与维度表数据,进行拉宽操作

在dwd目录下创建 WayBillDWD 单例对象,继承自AbstractOfflineApp特质

package cn.itcast.logistics.offline.dwd import cn.itcast.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, TableMapping} import cn.itcast.logistics.offline.AbstractOfflineApp import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ /** * 运单主题开发: * 将运单事实表的数据与相关维度表的数据进行关联,然后将拉宽后的数据写入到运单宽表中 */ object WayBillDWD extends AbstractOfflineApp{ /** * 将事实表与维度表关联,进行拉宽操作 */ override def process(dataframe: DataFrame): DataFrame = { // 导入隐式转换 import dataframe.sparkSession.implicits._ // step1. 加载维度表数据 // 加载快递员表 val courierDF: DataFrame = loadKuduSource(TableMapping.COURIER, isLoadFullData = true) // 加载网点表 val dotDF: DataFrame = loadKuduSource(TableMapping.DOT, isLoadFullData = true) // 加载区域表 val areasDF: DataFrame = loadKuduSource(TableMapping.AREAS, isLoadFullData = true) // 加载转运记录表 val recordDF: DataFrame = loadKuduSource(TableMapping.TRANSPORT_RECORD, isLoadFullData = true) // 加载起始仓库表 val startWarehouseDF: DataFrame = loadKuduSource(TableMapping.WAREHOUSE, isLoadFullData = true) // 加载到达仓库表 val endWarehouseDF: DataFrame = loadKuduSource(TableMapping.WAREHOUSE, isLoadFullData = true) // 加载车辆表 val toolDF: DataFrame = loadKuduSource(TableMapping.TRANSPORT_TOOL, isLoadFullData = true) // 加载线路表 val routeDF: DataFrame = loadKuduSource(TableMapping.ROUTE, isLoadFullData = true) // 加载起始仓库关联表 val startCompanyWarehouseDF: DataFrame = loadKuduSource(TableMapping.COMPANY_WAREHOUSE_MAP, isLoadFullData = true) // 加载到达仓库关联表 val endCompanyWarehouseDF: DataFrame = loadKuduSource(TableMapping.COMPANY_WAREHOUSE_MAP, isLoadFullData = true) // 加载起始仓库所在公司表 val startCompanyDF: DataFrame = loadKuduSource(TableMapping.COMPANY, isLoadFullData = true) // 加载到达仓库所在公司表 val endCompanyDF: DataFrame = loadKuduSource(TableMapping.COMPANY, isLoadFullData = true) // 加载物流码表 val codesDF: DataFrame = loadKuduSource(TableMapping.CODES, isLoadFullData = true) // 加载客户表 val customerDF: DataFrame = loadKuduSource(TableMapping.CUSTOMER, isLoadFullData = true) // 下单渠道类型表 val orderChannelTypeDF: DataFrame = codesDF .where(col("type") === CodeTypeMapping.ORDER_CHANNEL_TYPE) .select( col("code").as("orderChannelTypeCode"), col("codeDesc").as("orderChannelTypeName") ) // 客户类型表 val customerTypeDF: DataFrame = codesDF .where(col("type") === CodeTypeMapping.CUSTOM_TYPE) .select( col("code").as("customerTypeCode"), col("codeDesc").as("customerTypeName") ) // step2. 将事实表与维度表关联 val left_outer = "left_outer" val wayBillDF: DataFrame = dataframe val joinDF: DataFrame = wayBillDF // 运单表与快递员表进行关联 .join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer) // 网点表与快递员表进行关联 .join(dotDF, courierDF("dotId") === dotDF("id"), left_outer) // 网点表与区域表进行关联 .join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer) // 转运记录表与运单表关联 .join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer) // 起始仓库与转运记录表关联 .join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer) // 到达仓库与转运记录表关联 .join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer) // 转运记录表与交通工具表关联 .join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer) // 转运记录表与路线表关联 .join(routeDF, routeDF("id") === recordDF("routeId"), left_outer) // 起始仓库表与仓库公司关联表关联 .join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer) // 公司表与起始仓库公司关联表关联 .join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer) // 到达仓库表与仓库公司关联表关联 .join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer) // 公司表与到达仓库公司关联表关联 .join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer) // 运单表与客户表关联 .join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer) // 下单渠道表与运单表关联 .join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") === wayBillDF("orderChannelId"), left_outer) // 客户类型表与客户表关联 .join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer) // step3. 选择字段和添加日期day val wayBillDetailDF: Dataset[Row] = joinDF // 选择字段 .select( wayBillDF("id"), //运单id wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号 wayBillDF("waybillNumber").as("waybill_number"), //运单编号 wayBillDF("cid"), //客户id customerDF("name").as("cname"), //客户名称 customerDF("type").as("ctype"), //客户类型 customerTypeDF("customerTypeName").as("ctype_name"), //客户类型名称 wayBillDF("eid"), //快递员id courierDF("name").as("ename"), //快递员名称 dotDF("id").as("dot_id"), //网点id dotDF("dotName").as("dot_name"), //网点名称 areasDF("id").as("area_id"), //区域id areasDF("name").as("area_name"), //区域名称 wayBillDF("orderChannelId").as("order_channel_id"), //渠道id orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"), //渠道名称 wayBillDF("orderDt").as("order_dt"), //下单时间 wayBillDF("orderTerminalType").as("order_terminal_type"), //下单设备类型 wayBillDF("orderTerminalOsType").as("order_terminal_os_type"), //下单设备操作系统类型 wayBillDF("reserveDt").as("reserve_dt"), //预约取件时间 wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"), //是否取件超时 wayBillDF("pkgId").as("pkg_id"), //订装ID wayBillDF("pkgNumber").as("pkg_number"), //订装编号 wayBillDF("timeoutDt").as("timeout_dt"), //超时时间 wayBillDF("transformType").as("transform_type"), //运输方式 wayBillDF("deliveryAddr").as("delivery_addr"), wayBillDF("deliveryCustomerName").as("delivery_customer_name"), wayBillDF("deliveryMobile").as("delivery_mobile"), wayBillDF("deliveryTel").as("delivery_tel"), wayBillDF("receiveAddr").as("receive_addr"), wayBillDF("receiveCustomerName").as("receive_customer_name"), wayBillDF("receiveMobile").as("receive_mobile"), wayBillDF("receiveTel").as("receive_tel"), wayBillDF("cdt"), wayBillDF("udt"), wayBillDF("remark"), recordDF("swId").as("sw_id"), startWarehouseDF("name").as("sw_name"), startCompanyDF("id").as("sw_company_id"), startCompanyDF("companyName").as("sw_company_name"), recordDF("ewId").as("ew_id"), endWarehouseDF("name").as("ew_name"), endCompanyDF("id").as("ew_company_id"), endCompanyDF("companyName").as("ew_company_name"), toolDF("id").as("tt_id"), toolDF("licensePlate").as("tt_name"), recordDF("routeId").as("route_id"), concat(routeDF("startStation"), routeDF("endStation")).as("route_name") ) // 添加字段,日期字段day增加日期列 .withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd")) // 根据运单表的创建时间顺序排序 .sort(wayBillDF.col("cdt").asc) // 返回拉宽数据 wayBillDetailDF } // SparkSQL程序入口 def main(args: Array[String]): Unit = { // TODO: 调用父类中模板方法,传递相关参数即可 execute( this.getClass, // TableMapping.WAY_BILL, // 事实表 OfflineTableDefine.WAY_BILL_DETAIL, // 宽表 isLoadFullData = Configuration.IS_FIRST_RUNNABLE // ) } } 复制代码

注意:在DWD层程序中,mian仅仅调度父类模板方法,传递参数即可。

1616382579014

07-[掌握]-运单主题之指标计算【MAIN 方法】

任务:从Kudu数据库中加载运单宽表数据,按照指标进行计算,创建WayBillDWS对象,继承公共接口。

1)、指标字段,首先获取总的运单数及各个维度统计最大、最小和平均运单数目。

1616382689162

1616382707111

2)、Spark 编程实现

在dws目录下创建 WayBillDWS 单例对象,继承自AbstractOfflineApp特质

package cn.itcast.logistics.offline.dws import cn.itcast.logistics.common.{Configuration, OfflineTableDefine} import cn.itcast.logistics.offline.AbstractOfflineApp import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ /** * 运单主题指标开发: * 从Kudu表加载宽表数据,按照业务指标进行统计分析:基于不同维度分组聚合,类似快递单指表指标。 */ object WayBillDWS extends AbstractOfflineApp{ /** * 按照业务指标进行计算,按照日期day进行统计,最后返回DataFrame数据集 */ override def process(dataframe: DataFrame): DataFrame = { // 导入隐式转换 val session: SparkSession = dataframe.sparkSession import session.implicits._ // a. 获取日期值day // b. 遍历日期值,划分宽表数据,对每天数据进行指标统计 // c. 将指标结果转换DataFrame // 返回指标结果 } def main(args: Array[String]): Unit = { // 调用父类中模板方法,传递参数 execute( this.getClass, // OfflineTableDefine.WAY_BILL_DETAIL, OfflineTableDefine.WAY_BILL_SUMMARY, // isLoadFullData = Configuration.IS_FIRST_RUNNABLE // ) } } 复制代码 08-[掌握]-运单主题之指标计算【process 方法】

任务:实现process方法,先进行指标计算,按照每天数据进行指标统计,示意图如下所示:

1616339707049

具体指标计算代码如下所示:

// 导入隐式转换 val session: SparkSession = dataframe.sparkSession import session.implicits._ // a. 获取日期值day val days: Array[Row] = dataframe.select($"day").distinct().collect() // b. 遍历日期值,划分宽表数据,对每天数据进行指标统计 days.map{dayRow => // 获取日期值 val dayValue: String = dayRow.getString(0) // 过滤每日数据 val wayBillDetailDF: Dataset[Row] = dataframe.filter($"day" === dayValue) wayBillDetailDF.persist(StorageLevel.MEMORY_AND_DISK) // 指标计算 // 指标一:总运单数 val totalDF: DataFrame = wayBillDetailDF.agg(count("id").as("total")) // 指标二:各区域运单数,最大、最小和平均 val areaTotalDF: DataFrame = wayBillDetailDF.groupBy($"area_id").count() val areaTotalAggDF: DataFrame = areaTotalDF.agg( max($"count").as("areaMaxTotal"), min($"count").as("areaMinTotal"), round(avg($"count"), 0).as("areaAvgTotal") ) // 指标三:各分公司运单数,最大、最小和平均 val companyTotalDF: DataFrame = wayBillDetailDF.groupBy($"sw_company_name").count() val companyTotalAggDF: DataFrame = companyTotalDF.agg( max($"count").as("companyMaxTotal"), min($"count").as("companyMinTotal"), round(avg($"count"), 0).as("companyAvgTotal") ) // 指标四:各网点运单数,最大、最小和平均 val dotTotalDF: DataFrame = wayBillDetailDF.groupBy($"dot_id").count() val dotTotalAggDF: DataFrame = dotTotalDF.agg( max($"count").as("dotMaxTotal"), min($"count").as("dotMinTotal"), round(avg($"count"), 0).as("dotAvgTotal") ) // 指标五:各线路运单数,最大、最小和平均 val routeTotalDF: DataFrame = wayBillDetailDF.groupBy($"route_id").count() val routeTotalAggDF: DataFrame = routeTotalDF.agg( max($"count").as("routeMaxTotal"), min($"count").as("routeMinTotal"), round(avg($"count"), 0).as("routeAvgTotal") ) // 指标六:各运输工具运单数,最大、最小和平均 val ttTotalDF: DataFrame = wayBillDetailDF.groupBy($"tt_id").count() val ttTotalAggDF: DataFrame = ttTotalDF.agg( max($"count").as("ttMaxTotal"), min($"count").as("ttMinTotal"), round(avg($"count"), 0).as("ttAvgTotal") ) // 指标七:各类客户运单数,最大、最小和平均 val typeTotalDF: DataFrame = wayBillDetailDF.groupBy($"ctype").count() val typeTotalAggDF: DataFrame = typeTotalDF.agg( max($"count").as("typeMaxTotal"), min($"count").as("typeMinTotal"), round(avg($"count"), 0).as("typeAvgTotal") ) // 数据不再使用时,释放资源 wayBillDetailDF.unpersist() // 将计算指标封装到Row中 Row.fromSeq( dayRow.toSeq ++ totalDF.first().toSeq ++ areaTotalAggDF.first().toSeq ++ companyTotalAggDF.first().toSeq ++ dotTotalAggDF.first().toSeq ++ routeTotalAggDF.first().toSeq ++ ttTotalAggDF.first().toSeq ++ typeTotalAggDF.first().toSeq ) } 复制代码 09-[掌握]-运单主题之指标计算【转换DataFrame】

任务:将统计指标(封装在数组)转换为DataFrame数据集,采用自定义Schema方式完成。

// c. 将指标结果转换DataFrame // i. RDD[Row],采用并行化方式将数组转换为RDD val aggRDD: RDD[Row] = session.sparkContext.parallelize(aggRow) // ii. schema,自定义Schema信息 val schema: StructType = new StructType() .add("id", StringType, nullable = false) .add("total", LongType, nullable = true) .add("maxAreaTotal", LongType, nullable = true) .add("minAreaTotal", LongType, nullable = true) .add("avgAreaTotal", DoubleType, nullable = true) .add("maxCompanyTotal", LongType, nullable = true) .add("minCompanyTotal", LongType, nullable = true) .add("avgCompanyTotal", DoubleType, nullable = true) .add("maxDotTotal", LongType, nullable = true) .add("minDotTotal", LongType, nullable = true) .add("avgDotTotal", DoubleType, nullable = true) .add("maxRouteTotal", LongType, nullable = true) .add("minRouteTotal", LongType, nullable = true) .add("avgRouteTotal", DoubleType, nullable = true) .add("maxToolTotal", LongType, nullable = true) .add("minToolTotal", LongType, nullable = true) .add("avgToolTotal", DoubleType, nullable = true) .add("maxCtypeTotal", LongType, nullable = true) .add("minCtypeTotal", LongType, nullable = true) .add("avgCtypeTotal", DoubleType, nullable = true) // iii. 转换RDD为DataFrame val aggDF: DataFrame = session.createDataFrame(aggRDD, schema) // 返回指标结果 aggDF 复制代码

运单主题指标计算完整代码:

package cn.itcast.logistics.offline.dws import cn.itcast.logistics.common.{Configuration, OfflineTableDefine} import cn.itcast.logistics.offline.AbstractOfflineApp import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType} import org.apache.spark.storage.StorageLevel /** * 运单主题指标开发: * 从Kudu表加载宽表数据,按照业务指标进行统计分析:基于不同维度分组聚合,类似快递单指表指标。 */ object WayBillDWS extends AbstractOfflineApp{ /** * 按照业务指标进行计算,按照日期day进行统计,最后返回DataFrame数据集 */ override def process(dataframe: DataFrame): DataFrame = { // 导入隐式转换 val session: SparkSession = dataframe.sparkSession import session.implicits._ // a. 获取日期值day val days: Array[Row] = dataframe.select($"day").distinct().collect() // b. 遍历日期值,划分宽表数据,对每天数据进行指标统计 val aggRow: Array[Row] = days.map{ dayRow => // 获取日期值 val dayValue: String = dayRow.getString(0) // 过滤每日数据 val wayBillDetailDF: Dataset[Row] = dataframe.filter($"day" === dayValue) wayBillDetailDF.persist(StorageLevel.MEMORY_AND_DISK) // 指标计算 // 指标一:总运单数 val totalDF: DataFrame = wayBillDetailDF.agg(count("id").as("total")) // 指标二:各区域运单数,最大、最小和平均 val areaTotalDF: DataFrame = wayBillDetailDF.groupBy($"area_id").count() val areaTotalAggDF: DataFrame = areaTotalDF.agg( max($"count").as("areaMaxTotal"), min($"count").as("areaMinTotal"), round(avg($"count"), 0).as("areaAvgTotal") ) // 指标三:各分公司运单数,最大、最小和平均 val companyTotalDF: DataFrame = wayBillDetailDF.groupBy($"sw_company_name").count() val companyTotalAggDF: DataFrame = companyTotalDF.agg( max($"count").as("companyMaxTotal"), min($"count").as("companyMinTotal"), round(avg($"count"), 0).as("companyAvgTotal") ) // 指标四:各网点运单数,最大、最小和平均 val dotTotalDF: DataFrame = wayBillDetailDF.groupBy($"dot_id").count() val dotTotalAggDF: DataFrame = dotTotalDF.agg( max($"count").as("dotMaxTotal"), min($"count").as("dotMinTotal"), round(avg($"count"), 0).as("dotAvgTotal") ) // 指标五:各线路运单数,最大、最小和平均 val routeTotalDF: DataFrame = wayBillDetailDF.groupBy($"route_id").count() val routeTotalAggDF: DataFrame = routeTotalDF.agg( max($"count").as("routeMaxTotal"), min($"count").as("routeMinTotal"), round(avg($"count"), 0).as("routeAvgTotal") ) // 指标六:各运输工具运单数,最大、最小和平均 val ttTotalDF: DataFrame = wayBillDetailDF.groupBy($"tt_id").count() val ttTotalAggDF: DataFrame = ttTotalDF.agg( max($"count").as("ttMaxTotal"), min($"count").as("ttMinTotal"), round(avg($"count"), 0).as("ttAvgTotal") ) // 指标七:各类客户运单数,最大、最小和平均 val typeTotalDF: DataFrame = wayBillDetailDF.groupBy($"ctype").count() val typeTotalAggDF: DataFrame = typeTotalDF.agg( max($"count").as("typeMaxTotal"), min($"count").as("typeMinTotal"), round(avg($"count"), 0).as("typeAvgTotal") ) // 数据不再使用时,释放资源 wayBillDetailDF.unpersist() // 将计算指标封装到Row中 Row.fromSeq( dayRow.toSeq ++ totalDF.first().toSeq ++ areaTotalAggDF.first().toSeq ++ companyTotalAggDF.first().toSeq ++ dotTotalAggDF.first().toSeq ++ routeTotalAggDF.first().toSeq ++ ttTotalAggDF.first().toSeq ++ typeTotalAggDF.first().toSeq ) } // c. 将指标结果转换DataFrame // i. RDD[Row],采用并行化方式将数组转换为RDD val aggRDD: RDD[Row] = session.sparkContext.parallelize(aggRow) // ii. schema,自定义Schema信息 val schema: StructType = new StructType() .add("id", StringType, nullable = false) .add("total", LongType, nullable = true) .add("maxAreaTotal", LongType, nullable = true) .add("minAreaTotal", LongType, nullable = true) .add("avgAreaTotal", DoubleType, nullable = true) .add("maxCompanyTotal", LongType, nullable = true) .add("minCompanyTotal", LongType, nullable = true) .add("avgCompanyTotal", DoubleType, nullable = true) .add("maxDotTotal", LongType, nullable = true) .add("minDotTotal", LongType, nullable = true) .add("avgDotTotal", DoubleType, nullable = true) .add("maxRouteTotal", LongType, nullable = true) .add("minRouteTotal", LongType, nullable = true) .add("avgRouteTotal", DoubleType, nullable = true) .add("maxToolTotal", LongType, nullable = true) .add("minToolTotal", LongType, nullable = true) .add("avgToolTotal", DoubleType, nullable = true) .add("maxCtypeTotal", LongType, nullable = true) .add("minCtypeTotal", LongType, nullable = true) .add("avgCtypeTotal", DoubleType, nullable = true) // iii. 转换RDD为DataFrame val aggDF: DataFrame = session.createDataFrame(aggRDD, schema) // 返回指标结果 aggDF } def main(args: Array[String]): Unit = { // 调用父类中模板方法,传递参数 execute( this.getClass, // OfflineTableDefine.WAY_BILL_DETAIL, OfflineTableDefine.WAY_BILL_SUMMARY, // isLoadFullData = Configuration.IS_FIRST_RUNNABLE // ) } } 复制代码

面试题:RDD、DataFrame和Dataset之间关系和区别???

RDD是什么、DataFrame = RDD[Row] + schema 、Dataset = RDD[CaseClass] + schema、DataFrame = Dataset[Row]

10-[理解]-Kudu 原理及优化之数据存储模型

任务:针对Kudu存储引擎,底层数据存储原理和模型。

大数据存储引擎: - HDFS 分布式文件系统 从HDFS读写数据流程 - HBase 相关知识点 知识点:表如何设计的 ???RowKey如何设计??? 知识点:从HBase表读写数据流程是啥???? minor compaction:小合并,将多个storefile文件合并为多个文件 major compaction:大合并,将所有storefile文件合并为一个文件 复制代码

1614062789027

1)、表table与约束schema schema:字段名称、字段类型、是否为空,是否为主键 table数据划分为多个tablet 分区策略(如何划分数据)和副本数(数据安全性)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pyJOpRFZ-1641191197147)(/img/1616396524218.png)]

2)、Kudu 底层数据模型,非常类似HBase数据底层数据存储模型

1614062859258

​ Kudu的底层数据文件的存储,未采用HDFS这样的较高抽象层次的分布式文件系统,而是自行开发了一套可基于Table/Tablet/Replica视图级别的底层存储系统

1616396752594

1)、一个Table会被分成若干个tablet,其中Tablet的数量是根据hash或者是range进行设置的 2)、一个Tablet中包含MetaData信息和多个RowSet信息,其中MetaData信息是block和block在data中的位置。 3)、一个RowSet包含一个MemRowSet和多个DiskRowSet,其中MemRowSet用于存储insert数据和update后的数据,写满后会刷新到磁盘中也就是多个DiskRowSet中,默认是1G刷新一次或者是2分钟。 4)、DiskRowSet用于老数据的mutation(改变),比如说数据的更新操作,后台定期对DiskRowSet 进行合并操作,删除历史数据和没有的数据,减少查询过程中的IO开销 5)、一个DiskRowSet包含1个BloomFilter,1个Ad_hoc Index,多个UndoFile、RedoFile、BaseData、DeltaMem

有两个在内存中处理的数据集,区别如下:

MemRowSet:存储新增的数据,对该内存数据集中还未flush的数据的更新; 新插入数据insert(以前表中没有,主键不存在)。 更新后的数据(update以后,新的数据) DeltaMem:对已flush到磁盘内的数据的更新; 某个数据从memRowSet刷新到DiskRowSet磁盘中,用户对此数据进行更新,将更新数据放到DeltaMem内存中 当对DiskRowSet磁盘中老数据和DelteMem中要更新数据进行合并以后,再次放到MemRowSet中

1616396995152

在KUDU中,把DiskRowSet分为了两部分:

1)、base data: 负责存储基础数据 2)、delta stores:delta stores负责存储 base data 中的变更数据. DeltaMem,存储更新数据 DeltaFile,存储变更以后的数据

1616397248377

​ 数据从 MemRowSet 刷到磁盘后就形成了一份 DiskRowSet(只包含 basedata),每份 DiskRowSet 在内存中都会有一个对应的 DeltaMemStore,负责记录此 DiskRowSet后续的数据变更(更新、删除)。 DeltaMemStore 内部维护一个 B-树索引,映射到每个 row_offset对应的数据变更。DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile,随着 base data 对应数据的不断变更,DeltaFile 逐渐增长。

11-[理解]-Kudu 原理及优化之数据读写原理

任务:当从Kudu表读写(读取数据、插入数据、更新数据、删除数据)数据时,原理流程是如何的。

1)、Kudu的工作模式如下图

1616397619688

上图中编号对应功能描述,如下所示:

1616397899658

2)、Kudu 读流程

1616397919792

从Kudu读取数据时:

第一点:两次过滤定位,首先对应Tablet Follower·,然后获取DiskRowSet 第二点:先从DiskRowSet读取数据,再读取MemRowSet,最后合并数据,返回Client 3)、Kudu 写流程

1616398209135

当向Kudu表中写入数据时,进行三次判断插入数据主键是否存在,如果不存在,再进行插入数据:

第一次、主键范围过滤 第二次、布隆过滤器 第三次、B-树索引过滤

4)、Kudu 更新流程

更新删除流程与写入流程类似,区别就是最后判断是否存在主键时候的操作,若存在才能更新,不存在才能插入新数据。

1616398394710

更新和删除数据,与插入数据类型,需要经过三次过滤判断,当主键存在时,才进行更新和删除操作。

12-[理解]-Kudu 原理及优化之基本优化设置 1)、Kudu 关键配置,设置Kudu TabletServer 内存

1616399306832

2)、Kudu 使用限制

1616399377811

3)、字段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ihVh0pHo-1641191197151)(img/1616399441166.png)]

4)、表的设置

1616399481288

5)、分区限制

1616399538415

6)、扩展建议和限制

1616399599852

7)、守护进程,Master和TabletServer内存配置

1616399657809

8)、集群管理限制

1616399700991

9)、Spark 集成限制

1616399776432

【作业】-仓库主题报表开发 需求: 参考建议【1.5 仓库主题】,按照讲解【运单主题】报表开发步骤进行编程实现 第一、DWD层,开发程序:WarehouseDWD,继承接口AbstractOfflineApp 第二、DWS层,开发程序:WarehouseDWS,继承接口AbstractOfflineApp 复制代码 面试题: 如何对Kudu表数据进行备份操作???? 思路: SparkSQL程序,从Kudu读取数据,写入HDFS文件 复制代码


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3